/*
 * MIT License
 *
 * Copyright (c) 2023 北京凯特伟业科技有限公司
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */
package com.je.connector.base.connection.impl;

import com.je.connector.base.connection.Connection;
import com.je.connector.base.connection.SessionContext;
import com.je.connector.base.connection.channel.NettyChannelSocketChannel;
import com.je.connector.base.connection.channel.SocketChannel;
import com.je.connector.base.exception.ConnectorException;
import com.je.connector.base.protocol.Packet;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.util.Objects;

/**
 * 定义Netty的链接
 *
 * @ProjectName: instant-message
 * @Package: com.connector.base.connection.impl
 * @ClassName: NettyConnection
 * @Description: 定义Netty的链接
 * @Author: LIULJ
 * @Version: 1.0
 * <p>Copyright: Copyright (c) 2018</p>
 */
@Slf4j
@ToString
public final class NettyConnection implements Connection<Packet>, ChannelFutureListener {

    private static final PooledByteBufAllocator ALLOCATOR = PooledByteBufAllocator.DEFAULT;

    /**
     * 默认超时时间，1s
     */
    private static final int DEFAULT_TIMEOUT = 1000;
    private SessionContext context;
    private SocketChannel socketClient;
    private volatile byte status = STATUS_NEW;
    private long lastReadTime;
    private long lastWriteTime;
    private int timeout = DEFAULT_TIMEOUT;
    private String userId;
    private String deviceId;
    private ByteBuf buf = ALLOCATOR.buffer();

    @Override
    public void init(SocketChannel socketClient) {
        this.socketClient = socketClient;
        this.context = new SessionContext();
        this.lastReadTime = System.currentTimeMillis();
        this.status = STATUS_CONNECTED;
    }

    @Override
    public SessionContext getSessionContext() {
        return context;
    }

    @Override
    public void setSessionContext(SessionContext context) {
        this.context = context;
    }

    @Override
    public ChannelFuture channelSend(Packet packet) {
        return channelSend(packet, null);
    }

    @Override
    public ChannelFuture channelSend(Packet packet, ChannelFutureListener listener) {
        NettyChannelSocketChannel client = (NettyChannelSocketChannel) socketClient;
        Channel thisChannel = client.getChannel();
        if (null != thisChannel && thisChannel.isActive() && this.isConnected()) {
            log.info("最后out信息的userId=,{}", userId);
            ChannelFuture future = null;
            buf.clear();
            if (buf.readableBytes() > 0) {
                buf.resetReaderIndex();
            }
            if (buf.writableBytes() > 0) {
                buf.resetWriterIndex();
            }
            buf.retain();
            TextWebSocketFrame frame = new TextWebSocketFrame(buf);
            buf.writeBytes(packet.getBody());
            thisChannel.writeAndFlush(frame).addListener(this);
            if (listener != null) {
                future.addListener(listener);
            }
            if (thisChannel.isWritable()) {
                return future;
            }
            return future;
        } else {
            try {
                return this.channelClose();
            } catch (Exception e) {
                return null;
            }
        }
    }


    @Override
    public void sendEvent(String eventName, Object... data) {
        throw new ConnectorException("this method do not support the netty raw connection!");
    }

    @Override
    public ChannelFuture channelClose() {
        if (status == STATUS_DISCONNECTED) {
            return null;
        }
        this.status = STATUS_DISCONNECTED;
        NettyChannelSocketChannel client = (NettyChannelSocketChannel) socketClient;
        if (Objects.isNull(client) || Objects.isNull(client.getChannel())) {
            return null;
        }
        return client.getChannel().close();
    }

    @Override
    public void disConnect() {
        NettyChannelSocketChannel nettyChannel = (NettyChannelSocketChannel) socketClient;
        nettyChannel.getChannel().close();
    }

    @Override
    public String getId() {
        NettyChannelSocketChannel client = (NettyChannelSocketChannel) socketClient;
        return client.getChannel().id().asShortText();
    }

    @Override
    public boolean isConnected() {
        return status == STATUS_CONNECTED;
    }

    @Override
    public boolean isReadTimeout() {
        return System.currentTimeMillis() - lastReadTime > context.getHeartbeat() + timeout;
    }

    @Override
    public boolean isWriteTimeout() {
        return System.currentTimeMillis() - lastWriteTime > context.getHeartbeat() - timeout;
    }

    @Override
    public void updateLastReadTime() {
        this.lastReadTime = System.currentTimeMillis();
    }

    @Override
    public void updateLastWriteTime() {
        this.lastWriteTime = System.currentTimeMillis();
    }

    @Override
    public SocketChannel getSocketClient() {
        return socketClient;
    }

    public String getUserId() {
        return this.userId;
    }

    public void setUserId(String Id) {
        this.userId = Id;
    }

    @Override
    public String getDeviceId() {
        return deviceId;
    }

    @Override
    public void setDeviceId(String deviceId) {
        this.deviceId = deviceId;
    }

    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isSuccess()) {
            lastWriteTime = System.currentTimeMillis();
        } else {
            log.error("connection send msg error", future.cause());
        }
    }
}
